package net.gdface.facedb;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import gu.sql2java.TableManager.Action;
import gu.sql2java.manager.Managers;
import net.gdface.facedb.db.IImageManager;
import net.gdface.facedb.db.ImageBean;
import gu.sql2java.store.URLInfo;
import gu.sql2java.store.URLStore;

import static com.google.common.base.Preconditions.checkArgument;
import static net.gdface.facedb.TableManagerInitializer.INSTANCE;
import static net.gdface.facedb.StoreUtilConfig.STORE_CONFIG;
import static gu.sql2java.SimpleLog.log;
import static gu.sql2java.store.BinaryUtils.*;

public class StoreUtil implements LocalConstant{
	private final IImageManager imgmgr;
	private final Dao dao;
	private final ScheduledThreadPoolExecutor scheduledExecutor;
	private final ScheduledExecutorService timerExecutor;
	/** 线程池对象 */
	private final ExecutorService taskExecutor; 
	StoreUtil() {
		this.dao = new Dao();
		this.imgmgr = Managers.instanceOf(IImageManager.class);
		ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 4,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(1000),
                new ThreadFactoryBuilder().setNameFormat("store-task-%d").build());
		threadPoolExecutor.setRejectedExecutionHandler(new WaitingPolicy());
		this.taskExecutor = MoreExecutors.getExitingExecutorService(threadPoolExecutor);
		this.scheduledExecutor =new ScheduledThreadPoolExecutor(1,
				new ThreadFactoryBuilder().setNameFormat("store-mn-%d").build());	
		this.timerExecutor = MoreExecutors.getExitingScheduledExecutorService(scheduledExecutor);
		
	}

	private static final URL createURL(String input){
		try {
			return input == null ? null : new URL(input);
		} catch (MalformedURLException e) {
			throw new RuntimeException(e);
		}
	}
	protected void sync(
			final URLStore from,
			final URLStore to,
			final boolean storeFirst,
			final boolean removeImageBeanIfAbsent,
			final boolean isCopy,
			final boolean overwrite,
			final boolean cascadeFeature){
		checkArgument(from != null,"from is null");
		checkArgument(to != null,"to is null");

		final int total = imgmgr.countAll();
		final CountDownLatch left = new CountDownLatch(total);
		final AtomicInteger okCount = new AtomicInteger(0);
		final AtomicInteger removeCount = new AtomicInteger(0);
		final AtomicInteger failCount = new AtomicInteger(0);
		final AtomicBoolean stopped = new AtomicBoolean(false);
		// 启动定时任务
		this.timerExecutor.scheduleAtFixedRate(new TaskMonitor(total,left, okCount, removeCount, failCount),2,2,TimeUnit.SECONDS);
		log("IMAGE DATA synchronize TO %s", to.toString());
		log("SCAN ALL IMAGE ({})ROWS...(开始图像数据同步)", total);
		imgmgr.loadAll(new Action<ImageBean>(){
			@Override
			public void call(final ImageBean bean) {
					taskExecutor.submit(new Runnable() {
						@Override
						public void run() {
							imgmgr.runAsTransaction(new TransSyncer(from, to, isCopy, storeFirst, removeImageBeanIfAbsent, overwrite, cascadeFeature,bean, left, okCount, removeCount, failCount, stopped));
						}
					});
			}});	
		try {
			left.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		log("ALL ROWS synchronized(所有记录同步完成)");
	}
	
	protected void sync(			
			StoreType storeTarget,
			boolean removeImageBeanIfAbsent,
			boolean isCopy,
			boolean overwrite, boolean cascadeFeature){
		URLStore from,to;
		boolean storeFirst;
		StoreType target = MoreObjects.firstNonNull(storeTarget, INSTANCE.storeTarget);
 		switch (target) {
		case FILE:			
			from = INSTANCE.storeTable;
			to = INSTANCE.storeLocal;
			storeFirst = false;
			break;
		case DB:
			to = INSTANCE.storeTable;
			from = INSTANCE.storeLocal;
			storeFirst = true;
			break;
		default:
			throw new IllegalArgumentException(String.format("INVALID store target %s", INSTANCE.storeTarget));
		}
		sync(from, to, storeFirst, removeImageBeanIfAbsent, isCopy, overwrite, cascadeFeature);
	}
	public static void main(String[] args) {
		STORE_CONFIG.parseCommandLine(args);
		new StoreUtil().sync(
				STORE_CONFIG.getStoreTarget(),
				STORE_CONFIG.isRemoveImageBeanIfAbsent(),
				STORE_CONFIG.isCopy(),
				STORE_CONFIG.isOverwrite(), STORE_CONFIG.isCascadeFeature());
	}
	private static class TaskMonitor implements Runnable{

		final long total;
		final CountDownLatch left;
		final AtomicInteger okCount;
		final AtomicInteger removeCount;
		final AtomicInteger failCount;
		TaskMonitor(long total,CountDownLatch left, AtomicInteger okCount, AtomicInteger removeCount, AtomicInteger failCount) {
			this.total = total;
			this.left = left;
			this.okCount = okCount;
			this.removeCount = removeCount;
			this.failCount = failCount;
		}

		@Override
		public void run() {
			long count = total - left.getCount();
			System.out.printf("%d/%d\t\t%d/%dR/%dE\r", count,total,okCount.get(),removeCount.get(),failCount.get());			
		}
		
	}
	
	class TransSyncer implements Runnable{
		private final URLStore from;
		private final URLStore to;
		private final boolean isCopy;
		private final boolean storeFirst;
		private final boolean removeImageBeanIfAbsent;
		private final boolean overwrite;
		private final boolean cascadeFeature;
		private final ImageBean bean;
		private final CountDownLatch left;
		private final AtomicInteger okCount;
		private final AtomicInteger removeCount;
		private final AtomicInteger failCount;
		private final AtomicBoolean stopped;
		public TransSyncer(
				URLStore from,
				URLStore to,
				boolean isCopy,
				boolean storeFirst,
				boolean removeImageBeanIfAbsent,
				boolean overwrite,
				boolean cascadeFeature,
				ImageBean bean, 
				CountDownLatch left, 
				AtomicInteger okCount, 
				AtomicInteger removeCount, 
				AtomicInteger failCount, 
				AtomicBoolean stopped) {
			this.from = from;
			this.to = to;
			this.isCopy = isCopy;
			this.storeFirst = storeFirst;
			this.removeImageBeanIfAbsent = removeImageBeanIfAbsent;
			this.overwrite = overwrite;
			this.cascadeFeature = cascadeFeature;
			this.bean = bean;
			this.left = left;
			this.okCount = okCount;
			this.removeCount = removeCount;
			this.failCount = failCount;
			this.stopped = stopped;
		}
		@Override
		public void run() {
			if(!stopped.get()){
				try {
					URL src = createURL(bean.getLocation());
					if(from.isStored(src) && from.exists(src)){
						// 先计算存储 URL
						URL stored = to.store(src, overwrite, true);
						// 更新存储URL
						bean.setLocation(stored.toString());
						imgmgr.save(bean);
						if(storeFirst){
							to.store(src, overwrite, false);
							if(!isCopy){
								from.delete(src);
							}
						}else{
							byte[] data = getBytesNotEmpty(src);
							if(!isCopy){
								from.delete(src);
							}
							to.store(data, null, URLInfo.wrap(src).extension, overwrite, false);
						}
						okCount.incrementAndGet();
					}else if(!(to.isStored(src) && to.exists(src))){
						// NO EXISTS IMAGE
						if(removeImageBeanIfAbsent){
							dao.daoDeleteImage(bean.getMd5(), cascadeFeature);
							removeCount.incrementAndGet();
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
					failCount.incrementAndGet();
					Throwables.throwIfUnchecked(e);
					throw new RuntimeException(e);
				}finally {
					left.countDown();
				}
			}
		}
	}
	/**
	 * 任务拒绝处理策略<br>
	 * 基于{@link ThreadPoolExecutor.AbortPolicy}实现的{@link java.util.concurrent.RejectedExecutionHandler}<br>
	 * 当队列满时阻塞，直到队列可用
	 * @author guyadong
	 *
	 */
	static class WaitingPolicy extends ThreadPoolExecutor.AbortPolicy{
		@Override
		public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
			try {
				if (!executor.isShutdown()) {
					// 任务被拒绝是因为队列满了,所以这里调用put方法阻塞等待队列可用
					executor.getQueue().put(r);
				}
			} catch (InterruptedException e) {
				super.rejectedExecution(r, executor);
			}
		}		
	}
}
